<?php

declare(strict_types=1);

namespace App\Common\Queue\Consumer;

use App\System\Model\Debug;
use Hyperf\Amqp\Message\ConsumerDelayedMessageTrait;
use Hyperf\Amqp\Message\ProducerDelayedMessageTrait;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Utils\ApplicationContext;
use Mine\Amqp\Event\AfterConsume;
use Mine\Amqp\Event\BeforeProduce;
use Mine\Amqp\Event\ConsumeEvent;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\EventDispatcher\EventDispatcherInterface;

/**
 * 公共消费者
 */
#[Consumer(exchange: "grace.exchange",routingKey: "grace.routing", queue: "grace.queue", name: "grace.queue", nums: 1)]
class GraceConsumer extends ConsumerMessage
{
    use ProducerDelayedMessageTrait;
    use ConsumerDelayedMessageTrait;

    #[Inject]
    protected EventDispatcherInterface $eventDispatcher;

    /**
     * @param $data
     * @param AMQPMessage $message
     * @return string
     */
    public function consumeMessage($data, AMQPMessage $message): string
    {
        if(empty($data['data'])) {
            return Result::DROP;
        }
        //消费中
        $this->eventDispatcher->dispatch(new ConsumeEvent($message, $data));

        //逻辑处理
       //sleep(10);

        //消费后
        $this->eventDispatcher->dispatch(new AfterConsume($message, $data, Result::ACK));

        return Result::ACK;
    }

    /**
     * 设置是否启动amqp
     * @return bool
     */
    public function isEnable(): bool
    {
        return env('AMQP_ENABLE', false);
    }
}
